python进阶

您所在的位置:网站首页 python进阶语法 嵩天 python进阶

python进阶

2024-01-20 01:49| 来源: 网络整理| 查看: 265

文章目录 进程multiprocessing进程创建--Process类进程间通信--Queue进程池--Pool进程池通信--Manager.Queue

进程 进程

具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元.

程序是指令、数据及其组织形式的描述,进程是程序的实体.

在当代面向线程设计的计算机结构中,进程是线程的容器.

程是系统进行资源分配和调度的独立单元,线程是CPU调度和分配的基本单位。(一个qq程序可以理解为1个进程,1个qq聊天窗口可以理解为1个线程)

multiprocessing multiprocessing模块-基于进程的并行

python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了multiprocessing。

multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

参考文档:

官方文档:https://docs.python.org/3/library/multiprocessing.html

进程创建–Process类 # 构造方法 Process([group [, target [, name [, args [, kwargs]]]]])   group: 线程组,目前还没有实现,库引用中提示必须是None;   target: 要执行的方法;   name: 进程名;   args/kwargs: 要传入方法的参数。 # 实例方法   is_alive():返回进程是否在运行。   join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。   start():进程准备就绪,等待CPU调度   run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。   terminate():不管任务是否完成,立即停止工作进程 # 属性   authkey   daemon:和线程的setDeamon功能一样   exitcode(进程在运行时为None、如果为–N,表示被信号N结束)   name:进程名字   pid:进程号

方式一:直接传入要运行的方法创建多进程

from multiprocessing import Process import threading import time def foo(i): print('say hi', i) if __name__ == '__main__': for i in range(10): p = Process(target=foo, args=(i,)) p.start() > say hi 2 say hi 0 say hi 1 say hi 4 say hi 3 say hi 5 say hi 6 say hi 7 say hi 8 say hi 9

方式二:继承Process类并重写run()方法

from multiprocessing import Process import time class MyProcess(Process): def __init__(self, arg): super(MyProcess, self).__init__() self.arg = arg def run(self): print('say hi', self.arg) time.sleep(1) if __name__ == '__main__': for i in range(10): p = MyProcess(i) p.start()

进程id及进程关系查看

from multiprocessing import Process import os def info(title): print(title) print('module name:', __name__) # 得到父亲进程的id print('parent process:', os.getppid()) # 得到本身进程的id print('process id:', os.getpid()) def f(name): info('function f') print('hello', name) if __name__ == '__main__': info('main line') p = Process(target=f, args=('bob',)) p.start() p.join() > main line module name: __main__ parent process: 54134 process id: 54137 function f module name: __mp_main__ parent process: 54137 process id: 54139 hello bob 进程间通信–Queue

进程间通信

必要性: 进程间空间独立,资源不共享(无法共享全局变量),此时在需要进程间数据传输时就需要特定的手段进行数据通信。常用进程间通信方法:管道通信、消息队列、共享内存、信号量使用multiprocessing.Queue可以在进程间通信,但不能在Pool池创建的进程间进行通信

multiprocessing.Queue()和queue.Queue区别

queue.Queue是进程内(线程)非阻塞队列,multiprocessing.Queue()是跨进程通信队列queue.Queue各进程私有,multiprocessing.Queue各子进程共有。 # 进程通信--multiprocessing.Queue() from multiprocessing import Process, Queue def work1(q): while True: if not q.empty(): n = q.get() print('work1正在执行{}的平方,结果是{}'.format(n,str(int(n)*int(n)))) else: break def work2(q): while True: if not q.empty(): n = q.get() print('work2正在执行{}的平方,结果是{}'.format(n,str(int(n)*int(n)))) else: break if __name__ == "__main__": q = Queue() for i in range(10): q.put(i) p1 = Process(target=work1, args=(q,)) # 进程间的Queue()需要当作参数传入 p2 = Process(target=work2, args=(q,)) p1.start() p2.start() p1.join() p2.join() > work2正在执行0的平方,结果是0 work2正在执行1的平方,结果是1 work2正在执行2的平方,结果是4 work1正在执行3的平方,结果是9 work2正在执行4的平方,结果是16 work1正在执行5的平方,结果是25 work2正在执行6的平方,结果是36 work1正在执行7的平方,结果是49 work2正在执行8的平方,结果是64 work2正在执行9的平方,结果是81 # 生产者消费者模型 from multiprocessing import Process, Queue, Pool import time def producer(queue): queue.put('A') time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print(data) if __name__ == '__main__': queue= Queue() p = Process(target=producer, args=(queue,)) c = Process(target=consumer, args=(queue,)) p.start() c.start() p.join() c.join() 进程池–Pool 进程池可以控制进程的数量,重复利用进程对象,减少创建和销毁进程的开销。创建进程池可以接收一个参数,这个参数可以设置进程池的最大值。就是指定有几个子进程在“同时”进行。(为什么说是“同时”,因为在并发的情况下,子进程和主进程是按照时间片轮寻的方式执行的,只是切换得过快,并不是真的一起运行。)

Pool常用方法

apply_async(func[, args[, kwds]]) :使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表; apply(func[, args[, kwds]]):使用阻塞方式调用func close():关闭Pool,使其不再接受新的任务; terminate():不管任务是否完成,立即终止; join():主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用; import multiprocessing import os import random import time def run_time(index): start_time = time.time() time.sleep(random.random()) print("任务%d 任务id为%d 任务运行的时间为%0.2f" % (index,os.getpid(), time.time()-start_time)) if __name__ == '__main__': pool = multiprocessing.Pool(3) #创建进程池,并设置进程数量为3 for i in range(10): pool.apply_async(func=run_time,args=(i,)) # 启动10个进程 pool.close() # 必须先关闭进程池,不再让它接收新的进程,才能进行下一步的阻塞。(已经在排队的进程不算新进程了!例如上面的例子,10个子进程已经在进程池排队,所以join方法会阻塞直到10个子进程执行完成。) pool.join() > 任务2 任务id为60426 任务运行的时间为0.22 任务0 任务id为60424 任务运行的时间为0.38 任务1 任务id为60425 任务运行的时间为0.96 任务3 任务id为60426 任务运行的时间为0.89 任务4 任务id为60424 任务运行的时间为0.94 任务5 任务id为60425 任务运行的时间为0.42 任务6 任务id为60426 任务运行的时间为0.30 任务9 任务id为60426 任务运行的时间为0.44 任务8 任务id为60425 任务运行的时间为0.89 任务7 任务id为60424 任务运行的时间为0.98 > 从结果可以看出,只启动了3个进程去执行任务,其他进程都在排队 进程池通信–Manager.Queue

使用multiprocessing.Manager.Queue可以在Pool进程池创建的进程间进行通信

参考文档:https://www.jb51.net/article/182551.htm from multiprocessing import Process, Queue, Pool, Manager import time def producer(queue): queue.put('A') time.sleep(2) def consumer(queue): time.sleep(2) data = queue.get() print("consumer:%s" % data) if __name__ == '__main__': # queue = Queue(10) # 这个是使用multiprocessing.Queue,无效 queue = Manager().Queue(10) # 这个是使用multiprocessing.Manager.Queue, 可以 pool = Pool(2) pool.apply_async(producer, args=(queue,)) pool.apply_async(consumer, args=(queue,)) pool.close() pool.join() > consumer:A


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3